#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=doc-string-missing
import threading
import multiprocessing
import sys
import copy
if sys.version_info.major == 2:
    import Queue
elif sys.version_info.major == 3:
    import queue as Queue
else:
    raise Exception("Error Python version")
import os
import logging
import collections
import json
from .error_catch import ErrorCatch, CustomException, CustomExceptionCode, ParamChecker, ParamVerify
from .operator import Op, RequestOp, ResponseOp, VirtualOp
from .channel import (ThreadChannel, ProcessChannel, ChannelData,
                      ChannelDataType, ChannelStopError)
from .error_catch import  ProductErrCode
from .error_catch import CustomExceptionCode as ChannelDataErrcode
from .profiler import TimeProfiler, PerformanceTracer
from .util import NameGenerator, ThreadIdGenerator, PipelineProcSyncManager
from .proto import pipeline_service_pb2

_LOGGER = logging.getLogger(__name__)


class DAGExecutor(object):
    """
    DAG Executor, the service entrance of DAG.
    """
    def __init__(self, response_op, server_conf, worker_idx):
        """
        Initialize DAGExecutor.

        Args:
            response_op: Response OP
            server_conf: server conf. config.yaml
            worker_idx: DAGExecutor index, PipelineServer creates many
                DAGExecutors when _build_dag_each_worker is true.

        Returns:
            None.
        """
        build_dag_each_worker = server_conf["build_dag_each_worker"]
        server_worker_num = server_conf["worker_num"]
        dag_conf = server_conf["dag"]

        self._retry = dag_conf["retry"]
        self._server_use_profile = dag_conf["use_profile"]
        self._enable_prometheus = False
        if "enable_prometheus" in dag_conf:
            self._enable_prometheus = dag_conf["enable_prometheus"]
        if "prometheus_port" in dag_conf and self._enable_prometheus:
            self._prometheus_port = dag_conf["prometheus_port"]
        else:
            self._prometheus_port = None
        channel_size = dag_conf["channel_size"]
        channel_recv_frist_arrive = dag_conf["channel_recv_frist_arrive"]
        self._is_thread_op = dag_conf["is_thread_op"]

        tracer_conf = dag_conf["tracer"]
        tracer_interval_s = tracer_conf["interval_s"]

        self.name = "@DAGExecutor"
        self._profiler = TimeProfiler()
        self._profiler.enable(True)

        self._tracer = None
        if tracer_interval_s >= 1:
            self._tracer = PerformanceTracer(
                self._is_thread_op, tracer_interval_s, server_worker_num)
            if self._enable_prometheus:
                self._tracer.set_enable_dict(True)

        self._dag = DAG(self.name, response_op, self._server_use_profile, self._prometheus_port,
                        self._is_thread_op, channel_size, build_dag_each_worker,
                        self._tracer, channel_recv_frist_arrive)
        (in_channel, out_channel, pack_rpc_func,
         unpack_rpc_func) = self._dag.build()
        self._dag.start()

        self._set_in_channel(in_channel)
        self._set_out_channel(out_channel)
        self._pack_rpc_func = pack_rpc_func
        self._unpack_rpc_func = unpack_rpc_func

        if self._tracer is not None:
            self._tracer.start()

        # generate id 
        # data_id: Server Unique ID, automatically generated by the framework
        # log_id: Trace one product request, can be empty, not unique.
        base_counter = 0
        gen_id_step = 1
        if build_dag_each_worker:
            base_counter = worker_idx
            gen_id_step = server_worker_num
        self._id_generator = ThreadIdGenerator(
            max_id=1000000000000000000,
            base_counter=base_counter,
            step=gen_id_step)

        self._cv_pool = {}
        self._cv_for_cv_pool = threading.Condition()
        self._fetch_buffer = {}
        self._recive_func = None

        self._client_profile_key = "pipeline.profile"
        self._client_profile_value = "1"

    @ErrorCatch
    def start(self):
        """
        Starting one thread for receiving data from the last channel background.

        Args:
            None

        Returns:
            None
        """
        self._recive_func = threading.Thread(
            target=DAGExecutor._recive_out_channel_func, args=(self, ))
        self._recive_func.daemon = True
        self._recive_func.start()
        _LOGGER.debug("[DAG Executor] Start recive thread")

    def stop(self):
        """
        Stopping DAG

        Args:
            None

        Returns:
            None
        """
        self._dag.stop()
        self._dag.join()
        _LOGGER.info("[DAG Executor] Stop")

    def _get_next_data_id(self):
        """
        Generate data_id incrementally and Uniquely
   
        Args:
            None

        Returns:
            data_id: uniq id
            cond_v: condition variable
        """
        data_id = self._id_generator.next()
        cond_v = threading.Condition()
        with self._cv_for_cv_pool:
            self._cv_pool[data_id] = cond_v
            self._fetch_buffer[data_id] = None
        return data_id, cond_v

    def _set_in_channel(self, in_channel):
        """
        Set in_channel of DAG

        Args:
            in_channel: input channel of DAG

        Returns:
            None 
        """
        if not isinstance(in_channel, (ThreadChannel, ProcessChannel)):
            _LOGGER.critical("[DAG Executor] Failed to set in_channel: "
                             "in_channel must be Channel type, but get {}".
                             format(type(in_channel)))
            os._exit(-1)

        self._in_channel = in_channel
        _LOGGER.info("[DAG] set in channel succ, name [{}]".format(self.name))

    def _set_out_channel(self, out_channel):
        """
        Set out_channel of DAG

        Args:
            out_channel: output channel of DAG

        Returns:
            None 
        """
        if not isinstance(out_channel, (ThreadChannel, ProcessChannel)):
            _LOGGER.critical("[DAG Executor] Failed to set out_channel: "
                             "must be Channel type, but get {}".format(
                                 type(out_channel)))
            os._exit(-1)
        out_channel.add_consumer(self.name)
        self._out_channel = out_channel

    def _recive_out_channel_func(self):
        """
        Receiving data from the output channel, and pushing data into 
        _fetch_buffer. Function _get_channeldata_from_fetch_buffer gets 
        data by retry time.

        Args:
            None

        Returns:
            None
        """
        cv = None
        while True:
            try:
                channeldata_dict = self._out_channel.front(self.name)
            except ChannelStopError:
                _LOGGER.info("[DAG Executor] Stop.")
                with self._cv_for_cv_pool:
                    for data_id, cv in self._cv_pool.items():
                        closed_errror_data = ChannelData(
                            error_code=ChannelDataErrcode.CLOSED_ERROR.value,
                            error_info="dag closed.",
                            data_id=data_id)
                        with cv:
                            self._fetch_buffer[data_id] = closed_errror_data
                            cv.notify_all()
                break
            if len(channeldata_dict) != 1:
                _LOGGER.critical(
                    "[DAG Executor] Failed to fetch result: out_channel "
                    "cannot have multiple input ops")
                os._exit(-1)
            (_, channeldata), = channeldata_dict.items()
            if not isinstance(channeldata, ChannelData):
                _LOGGER.critical(
                    '[DAG Executor] Failed to fetch result: data in out_channel" \
                    " must be ChannelData type, but get {}'
                    .format(type(channeldata)))
                os._exit(-1)

            data_id = channeldata.id
            _LOGGER.debug("(logid={}) [recive thread] Fetched data".format(
                data_id))
            with self._cv_for_cv_pool:
                cond_v = self._cv_pool[data_id]
            with cond_v:
                self._fetch_buffer[data_id] = channeldata
                cond_v.notify_all()

    def _get_channeldata_from_fetch_buffer(self, data_id, cond_v):
        """
        Getting the channel data from _fetch_buffer.

        Args:
            data_id: search key
            cond_v: conditional variable

        Returns:
            ready_data: one channel data processed
        """
        ready_data = None

        with cond_v:
            with self._cv_for_cv_pool:
                if self._fetch_buffer[data_id] is not None:
                    # The requested data is already ready
                    ready_data = self._fetch_buffer[data_id]
                    self._cv_pool.pop(data_id)
                    self._fetch_buffer.pop(data_id)
            if ready_data is None:
                # Wait for data ready
                cond_v.wait()
                with self._cv_for_cv_pool:
                    ready_data = self._fetch_buffer[data_id]
                    self._cv_pool.pop(data_id)
                    self._fetch_buffer.pop(data_id)
        _LOGGER.debug("(data_id={}) [resp thread] Got data".format(data_id))
        return ready_data

    def _pack_channeldata(self, rpc_request, data_id):
        """
        Unpacking data from RPC request. and creating one channelData.

        Args:
           rpc_request: one RPC request
           data_id: data id, unique

        Returns:
            ChannelData: one channel data to be processed
        """
        dictdata = None
        log_id = None
        try:
            dictdata, log_id, prod_errcode, prod_errinfo = self._unpack_rpc_func(
                rpc_request)
        except Exception as e:
            _LOGGER.error(
                "(logid={}) Failed to parse RPC request package: {}"
                .format(data_id, e),
                exc_info=True)
            return ChannelData(
                error_code=ChannelDataErrcode.RPC_PACKAGE_ERROR.value,
                error_info="rpc package error: {}".format(e),
                data_id=data_id,
                log_id=log_id)
        else:
            # because unpack_rpc_func is rewritten by user, we need to look
            # for product_errcode in returns, and  client_profile_key field
            # in rpc_request
            if prod_errcode is not None:
                # product errors occured
                _LOGGER.error("unpack_rpc_func prod_errcode:{}".format(
                    prod_errcode))
                return ChannelData(
                    error_code=ChannelDataErrcode.PRODUCT_ERROR.value,
                    error_info="",
                    prod_error_code=prod_errcode,
                    prod_error_info=prod_errinfo,
                    data_id=data_id,
                    log_id=log_id)

            profile_value = None
            profile_value = dictdata.get(self._client_profile_key)
            client_need_profile = (profile_value == self._client_profile_value)
            return ChannelData(
                datatype=ChannelDataType.DICT.value,
                dictdata=dictdata,
                data_id=data_id,
                log_id=log_id,
                client_need_profile=client_need_profile)

    def call(self, rpc_request):
        """
        DAGExcutor enterance function. There are 5 steps:
        1._get_next_data_id: Generate an incremental ID
        2._pack_channeldata: pack the channel data from request.
        3.retry loop: 
            a. push channel_data into _in_channel
            b. get_channeldata_from_fetch_buffer: get results.
        4._pack_for_rpc_resp: pack RPC responses
        5.profile: generte profile string and pack into response.

        Args:
            rpc_request: one RPC request
   
        Returns:
            rpc_resp: one RPC response
        """
        if self._tracer is not None:
            trace_buffer = self._tracer.data_buffer()

        data_id, cond_v = self._get_next_data_id()

        start_call, end_call = None, None
        if not self._is_thread_op:
            start_call = self._profiler.record("call_{}#DAG-{}_0".format(
                data_id, data_id))
        else:
            start_call = self._profiler.record("call_{}#DAG_0".format(data_id))

        self._profiler.record("prepack_{}#{}_0".format(data_id, self.name))
        req_channeldata = self._pack_channeldata(rpc_request, data_id)
        self._profiler.record("prepack_{}#{}_1".format(data_id, self.name))

        log_id = req_channeldata.log_id
        _LOGGER.info("(data_id={} log_id={}) Succ Generate ID ".format(data_id,
                                                                       log_id))

        resp_channeldata = None
        for i in range(self._retry):
            _LOGGER.debug("(data_id={}) Pushing data into Graph engine".format(
                data_id))
            try:
                if req_channeldata is None:
                    _LOGGER.critical(
                        "(data_id={} log_id={}) req_channeldata is None"
                        .format(data_id, log_id))
                if not isinstance(self._in_channel,
                                  (ThreadChannel, ProcessChannel)):
                    _LOGGER.critical(
                        "(data_id={} log_id={})[DAG Executor] Failed to "
                        "set in_channel: in_channel must be Channel type, but get {}".
                        format(data_id, log_id, type(self._in_channel)))
                self._in_channel.push(req_channeldata, self.name)
            except ChannelStopError:
                _LOGGER.error("(data_id:{} log_id={})[DAG Executor] Stop".
                              format(data_id, log_id))
                with self._cv_for_cv_pool:
                    self._cv_pool.pop(data_id)
                return self._pack_for_rpc_resp(
                    ChannelData(
                        error_code=ChannelDataErrcode.CLOSED_ERROR.value,
                        error_info="dag closed.",
                        data_id=data_id))

            _LOGGER.debug("(data_id={} log_id={}) Wait for Graph engine...".
                          format(data_id, log_id))
            resp_channeldata = self._get_channeldata_from_fetch_buffer(data_id,
                                                                       cond_v)

            if resp_channeldata.error_code == ChannelDataErrcode.OK.value:
                _LOGGER.info("(data_id={} log_id={}) Succ predict".format(
                    data_id, log_id))
                break
            else:
                _LOGGER.error("(data_id={} log_id={}) Failed to predict: {}"
                              .format(data_id, log_id,
                                      resp_channeldata.error_info))
                if resp_channeldata.error_code != ChannelDataErrcode.TIMEOUT.value:
                    break

            if i + 1 < self._retry:
                _LOGGER.warning(
                    "(data_id={} log_id={}) DAGExecutor retry({}/{})"
                    .format(data_id, log_id, i + 1, self._retry))

        _LOGGER.debug("(data_id={} log_id={}) Packing RPC response package"
                      .format(data_id, log_id))
        self._profiler.record("postpack_{}#{}_0".format(data_id, self.name))
        rpc_resp = self._pack_for_rpc_resp(resp_channeldata)
        self._profiler.record("postpack_{}#{}_1".format(data_id, self.name))
        if not self._is_thread_op:
            end_call = self._profiler.record("call_{}#DAG-{}_1".format(data_id,
                                                                       data_id))
        else:
            end_call = self._profiler.record("call_{}#DAG_1".format(data_id))

        if self._tracer is not None:
            trace_buffer.put({
                "name": "DAG",
                "id": data_id,
                "succ":
                resp_channeldata.error_code == ChannelDataErrcode.OK.value,
                "actions": {
                    "call_{}".format(data_id): end_call - start_call,
                },
            })

        profile_str = self._profiler.gen_profile_str()
        if self._server_use_profile:
            sys.stderr.write(profile_str)

        # add profile info into rpc_resp
        if resp_channeldata.client_need_profile:
            profile_set = resp_channeldata.profile_data_set
            profile_set.add(profile_str)
            profile_value = "".join(list(profile_set))
            rpc_resp.key.append(self._client_profile_key)
            rpc_resp.value.append(profile_value)

        return rpc_resp

    def _pack_for_rpc_resp(self, channeldata):
        """
        Packing one RPC response

        Args:
            channeldata: one channel data to be packed

        Returns:
            resp: one RPC response
        """
        try:
            return self._pack_rpc_func(channeldata)
        except Exception as e:
            _LOGGER.error(
                "(logid={}) Failed to pack RPC response package: {}"
                .format(channeldata.id, e),
                exc_info=True)
            resp = pipeline_service_pb2.Response()
            resp.err_no = ChannelDataErrcode.RPC_PACKAGE_ERROR.value
            resp.err_msg = "rpc package error: {}".format(e)
            return resp


class DAG(object):
    """
    Directed Acyclic Graph(DAG) engine, builds one DAG topology.
    """
    def __init__(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
                 channel_size, build_dag_each_worker, tracer,
                 channel_recv_frist_arrive):
        _LOGGER.info("{}, {}, {}, {}, {}, {} ,{} ,{} ,{}".format(request_name, response_op, use_profile, prometheus_port, is_thread_op,
                         channel_size, build_dag_each_worker, tracer,
                                          channel_recv_frist_arrive))
        @ErrorCatch
        @ParamChecker
        def init_helper(self, request_name: str,
                         response_op, 
                         use_profile: [bool, None], 
                         prometheus_port: [int, None],
                         is_thread_op: bool,
                         channel_size, 
                         build_dag_each_worker: [bool, None],
                         tracer,
                        channel_recv_frist_arrive):
            self._request_name = request_name
            self._response_op = response_op
            self._use_profile = use_profile
            self._prometheus_port = prometheus_port
            self._use_prometheus = (self._prometheus_port is not None)
            self._is_thread_op = is_thread_op
            self._channel_size = channel_size
            self._build_dag_each_worker = build_dag_each_worker
            self._tracer = tracer
            self._channel_recv_frist_arrive = channel_recv_frist_arrive
            if not self._is_thread_op:
                self._manager = PipelineProcSyncManager()
        init_helper(self, request_name, response_op, use_profile, prometheus_port, is_thread_op,
                    channel_size, build_dag_each_worker, tracer,
                    channel_recv_frist_arrive)
        print("[DAG] Succ init")
        _LOGGER.info("[DAG] Succ init")

    @staticmethod
    def get_use_ops(response_op):
        """
        Starting from ResponseOp, recursively traverse the front OPs. Getting
        all used ops and the post op list of each op (excluding ResponseOp)

        Args:
            response_op: ResponseOp

        Returns:
            used_ops: used ops, set
            succ_ops_of_use_op: op and the next op list, dict.
            
        """
        unique_names = set()
        used_ops = set()
        succ_ops_of_use_op = {}  # {op_name: succ_ops}
        que = Queue.Queue()
        que.put(response_op)
        while que.qsize() != 0:
            op = que.get()
            for pred_op in op.get_input_ops():
                if pred_op.name not in succ_ops_of_use_op:
                    succ_ops_of_use_op[pred_op.name] = []
                if op != response_op:
                    succ_ops_of_use_op[pred_op.name].append(op)
                if pred_op not in used_ops:
                    que.put(pred_op)
                    used_ops.add(pred_op)
                    # check the name of op is globally unique
                    if pred_op.name in unique_names:
                        _LOGGER.critical("Failed to get used Ops: the"
                                         " name of Op must be unique: {}".
                                         format(pred_op.name))
                        os._exit(-1)
                    unique_names.add(pred_op.name)
        return used_ops, succ_ops_of_use_op

    def _gen_channel(self, name_gen):
        """
        Generate one ThreadChannel or ProcessChannel.

        Args:
            name_gen: channel name

        Returns:
            channel: one channel generated
        """
        channel = None
        if self._is_thread_op:
            channel = ThreadChannel(
                name=name_gen.next(),
                maxsize=self._channel_size,
                channel_recv_frist_arrive=self._channel_recv_frist_arrive)
        else:
            channel = ProcessChannel(
                self._manager,
                name=name_gen.next(),
                maxsize=self._channel_size,
                channel_recv_frist_arrive=self._channel_recv_frist_arrive)
        _LOGGER.debug("[DAG] Generate channel: {}".format(channel.name))
        return channel

    def _gen_virtual_op(self, name_gen):
        """
        Generate one virtual Op

        Args:
            name_gen: Op name

        Returns:
            vir_op: one virtual Op object.
        """
        vir_op = VirtualOp(name=name_gen.next())
        _LOGGER.debug("[DAG] Generate virtual_op: {}".format(vir_op.name))
        return vir_op

    def _topo_sort(self, used_ops, response_op, out_degree_ops):
        """
        Topological sort of DAG, creates inverted multi-layers views.

        Args:
            used_ops: op used in DAG
            response_op: response op
            out_degree_ops: Next op list for each op, dict. the output of 
                get_use_ops()

        Returns:
            dag_views: the inverted hierarchical topology list. examples:
                DAG :[A -> B -> C -> E]
                            \-> D /
                dag_views: [[E], [C, D], [B], [A]]
                         
            last_op:the last op front of ResponseOp
        """
        out_degree_num = {
            name: len(ops)
            for name, ops in out_degree_ops.items()
        }
        que_idx = 0  # scroll queue 
        ques = [Queue.Queue() for _ in range(2)]
        zero_indegree_num = 0
        for op in used_ops:
            if len(op.get_input_ops()) == 0:
                zero_indegree_num += 1
        if zero_indegree_num != 1:
            _LOGGER.critical("Failed to topo sort: DAG contains "
                             "multiple RequestOps")
            os._exit(-1)
        last_op = response_op.get_input_ops()[0]
        ques[que_idx].put(last_op)

        # topo sort to get dag_views
        dag_views = []
        sorted_op_num = 0
        while True:
            que = ques[que_idx]
            next_que = ques[(que_idx + 1) % 2]
            dag_view = []
            while que.qsize() != 0:
                op = que.get()
                dag_view.append(op)
                sorted_op_num += 1
                for pred_op in op.get_input_ops():
                    out_degree_num[pred_op.name] -= 1
                    if out_degree_num[pred_op.name] == 0:
                        next_que.put(pred_op)
            dag_views.append(dag_view)
            if next_que.qsize() == 0:
                break
            que_idx = (que_idx + 1) % 2
        if sorted_op_num < len(used_ops):
            _LOGGER.critical("Failed to topo sort: not legal DAG")
            os._exit(-1)

        return dag_views, last_op

    def _build_dag(self, response_op):
        """
        Building DAG, the most important function in class DAG. Core steps:
        1.get_use_ops: Getting used ops, and out degree op list for each op.
        2._topo_sort: Topological sort creates inverted multi-layers views.
        3.create channels and virtual ops.

        Args:
            response_op: ResponseOp

        Returns:
            actual_ops: all OPs used in DAG, including virtual OPs
            channels: all channels used in DAG 
            input_channel: the channel of first OP 
            output_channel: the channel of last OP
            pack_func: pack_response_package function of response_op
            unpack_func: unpack_request_package function of request_op
        """
        if response_op is None:
            _LOGGER.critical("Failed to build DAG: ResponseOp"
                             " has not been set.")
            os._exit(-1)
        used_ops, out_degree_ops = DAG.get_use_ops(response_op)
        if not self._build_dag_each_worker:
            _LOGGER.info("================= USED OP =================")
            for op in used_ops:
                if not isinstance(op, RequestOp):
                    _LOGGER.info(op.name)
            _LOGGER.info("-------------------------------------------")
        if len(used_ops) <= 1:
            _LOGGER.critical(
                "Failed to build DAG: besides RequestOp and ResponseOp, "
                "there should be at least one Op in DAG.")
            os._exit(-1)
        if self._build_dag_each_worker:
            _LOGGER.info("Because `build_dag_each_worker` mode is used, "
                         "Auto-batching is set to the default config: "
                         "batch_size=1, auto_batching_timeout=None")
            for op in used_ops:
                op.use_default_auto_batching_config()

        dag_views, last_op = self._topo_sort(used_ops, response_op,
                                             out_degree_ops)
        dag_views = list(reversed(dag_views))
        if not self._build_dag_each_worker:
            _LOGGER.info("================== DAG ====================")
            for idx, view in enumerate(dag_views):
                _LOGGER.info("(VIEW {})".format(idx))
                for op in view:
                    _LOGGER.info("  [{}]".format(op.name))
                    for out_op in out_degree_ops[op.name]:
                        _LOGGER.info("    - {}".format(out_op.name))
            _LOGGER.info("-------------------------------------------")

        # create channels and virtual ops
        virtual_op_name_gen = NameGenerator("vir")
        channel_name_gen = NameGenerator("chl")
        virtual_ops = []
        channels = []
        input_channel = None
        actual_view = None
        for v_idx, view in enumerate(dag_views):
            if v_idx + 1 >= len(dag_views):
                break
            next_view = dag_views[v_idx + 1]
            if actual_view is None:
                actual_view = view
            actual_next_view = []
            pred_op_of_next_view_op = {}
            for op in actual_view:
                # find actual succ op in next view and create virtual op
                for succ_op in out_degree_ops[op.name]:
                    if succ_op in next_view:
                        if succ_op not in actual_next_view:
                            actual_next_view.append(succ_op)
                        if succ_op.name not in pred_op_of_next_view_op:
                            pred_op_of_next_view_op[succ_op.name] = []
                        pred_op_of_next_view_op[succ_op.name].append(op)
                    else:
                        # create virtual op
                        virtual_op = self._gen_virtual_op(virtual_op_name_gen)
                        virtual_ops.append(virtual_op)
                        out_degree_ops[virtual_op.name] = [succ_op]
                        actual_next_view.append(virtual_op)
                        pred_op_of_next_view_op[virtual_op.name] = [op]
                        virtual_op.add_virtual_pred_op(op)
            actual_view = actual_next_view
            # create channel
            processed_op = set()
            for o_idx, op in enumerate(actual_next_view):
                if op.name in processed_op:
                    continue
                channel = self._gen_channel(channel_name_gen)
                channels.append(channel)
                op.add_input_channel(channel)
                _LOGGER.info("op:{} add input channel.".format(op.name))
                pred_ops = pred_op_of_next_view_op[op.name]
                if v_idx == 0:
                    input_channel = channel
                else:
                    # if pred_op is virtual op, it will use ancestors as producers to channel
                    for pred_op in pred_ops:
                        pred_op.add_output_channel(channel)
                        _LOGGER.info("pred_op:{} add output channel".format(
                            pred_op.name))
                processed_op.add(op.name)
                # find same input op to combine channel
                for other_op in actual_next_view[o_idx + 1:]:
                    if other_op.name in processed_op:
                        continue
                    other_pred_ops = pred_op_of_next_view_op[other_op.name]
                    if len(other_pred_ops) != len(pred_ops):
                        continue
                    same_flag = True
                    for pred_op in pred_ops:
                        if pred_op not in other_pred_ops:
                            same_flag = False
                            break
                    if same_flag:
                        other_op.add_input_channel(channel)
                        processed_op.add(other_op.name)
        output_channel = self._gen_channel(channel_name_gen)
        channels.append(output_channel)
        last_op.add_output_channel(output_channel)
        _LOGGER.info("last op:{} add output channel".format(last_op.name))

        pack_func, unpack_func = None, None
        pack_func = response_op.pack_response_package

        actual_ops = virtual_ops
        for op in used_ops:
            if len(op.get_input_ops()) == 0:
                #set special features of the request op. 
                #1.set unpack function.
                #2.set output channel. 
                unpack_func = op.unpack_request_package
                op.add_output_channel(input_channel)
                continue
            actual_ops.append(op)

        for c in channels:
            _LOGGER.debug("Channel({}):\n\t- producers: {}\n\t- consumers: {}"
                          .format(c.name, c.get_producers(), c.get_consumers()))

        return (actual_ops, channels, input_channel, output_channel, pack_func,
                unpack_func)

    def get_channels(self):
        return self._channels

    def build(self):
        """
        Interface for building one DAG outside.

        Args:
            None

        Returns:
            _input_channel: the channel of first OP
            _output_channel:  the channel of last OP
            _pack_func: pack_response_package function of response_op
            _unpack_func: unpack_request_package function of request_op
        """
        (actual_ops, channels, input_channel, output_channel, pack_func,
         unpack_func) = self._build_dag(self._response_op)
        _LOGGER.info("[DAG] Succ build DAG")

        self._actual_ops = actual_ops
        self._channels = channels
        self._input_channel = input_channel
        self._output_channel = output_channel
        self._pack_func = pack_func
        self._unpack_func = unpack_func

        if self._tracer is not None:
            self._tracer.set_channels(self._channels)

        return self._input_channel, self._output_channel, self._pack_func, self._unpack_func

    def start_prom(self, prometheus_port):
        import prometheus_client
        from prometheus_client import Counter
        from prometheus_client.core import CollectorRegistry

        from flask import Response, Flask
        from .prometheus_metrics import registry 
        from .prometheus_metrics import metric_query_success, metric_query_failure, metric_inf_count, metric_query_duration_us, metric_inf_duration_us 
        app = Flask(__name__)
        # requests_total = Counter('c1','A counter') 
        
        @app.route("/metrics")
        def requests_count():
            item = self._tracer.profile_dict
            _LOGGER.info("metrics: {}".format(item))
            # {'uci': {'in': 727.443, 'prep': 0.5525833333333333, 'midp': 2.21375, 'postp': 1.32375, 'out': 0.9396666666666667}, 'DAG': {'call_0': 29.479, 'call_1': 8.176, 'call_2': 8.045, 'call_3': 7.988, 'call_4': 7.609, 'call_5': 7.629, 'call_6': 7.625, 'call_7': 8.32, 'call_8': 8.57, 'call_9': 8.055, 'call_10': 7.915, 'call_11': 7.873, 'query_count': 12, 'qps': 1.2, 'succ': 1.0, 'avg': 9.773666666666667, '50': 8.045, '60': 8.055, '70': 8.176, '80': 8.32, '90': 8.57, '95': 29.479, '99': 29.479}}
            if "DAG" in item:
                total = item["DAG"]["query_count"]
                succ = total * item["DAG"]["succ"]
                fail = total * (1 - item["DAG"]["succ"])
                query_duration = total *item["DAG"]["avg"]
                metric_query_success.inc(succ)
                metric_query_failure._value.inc(fail)
                metric_query_duration_us._value.inc(query_duration)

                inf_cnt = 0
                infer_duration = 0.0
                for name in item:
                    if name != "DAG":
                        if "count" in item[name]:
                            inf_cnt += item[name]["count"]
                            if "midp" in item[name]:
                                infer_duration += item[name]["count"]*item[name]["midp"]
                metric_inf_count._value.inc(inf_cnt)
                metric_inf_duration_us._value.inc(infer_duration)
            
            #return str(item)
            self._tracer.profile_dict = {}
            return Response(prometheus_client.generate_latest(registry),mimetype="text/plain")

        def prom_run():
            app.run(host="0.0.0.0",port=prometheus_port)
       
        p = threading.Thread(
                target=prom_run,
                args=())
        _LOGGER.info("Prometheus Start 2")
        p.daemon = True
        p.start()

    def start(self):
        """
        Each OP starts a thread or process by _is_thread_op 

        Args:
            None

        Returns:
            _threads_or_proces: threads or process list.
        """
        self._threads_or_proces = []
        for op in self._actual_ops:
            op.use_profiler(self._use_profile)
            op.set_tracer(self._tracer)
            op.set_use_prometheus(self._use_prometheus)
            if self._is_thread_op:
                self._threads_or_proces.extend(op.start_with_thread())
            else:
                self._threads_or_proces.extend(op.start_with_process())
        _LOGGER.info("[DAG] start")
        if self._use_prometheus:
            _LOGGER.info("Prometheus Start 1")
            self.start_prom(self._prometheus_port)
         
        # not join yet
        return self._threads_or_proces

    def join(self):
        """
        All threads or processes join.

        Args:
            None

        Returns:
            None
        """
        for x in self._threads_or_proces:
            if x is not None:
                x.join()

    def stop(self):
        """
        Stopping and cleanning all channels.

        Args:
            None

        Returns:
            None 
        """
        for chl in self._channels:
            chl.stop()
        for op in self._actual_ops:
            op.clean_input_channel()
            op.clean_output_channels()
